package edu.cmu.ml.rtw.users.matt.randomwalks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.MultiFileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
import org.apache.mahout.common.IntPairWritable;

/**
 * Takes the walk file generated by GraphChiWalk and produces a matrix of (node pair) X
 * (path feature).
 */
public class WalkFileInputFormat extends FileInputFormat<IntWritable, IntPairWritable> {
    private final static Logger log = Logger.getLogger(WalkFileInputFormat.class);

    @Override
    protected boolean isSplitable(FileSystem file_system, Path filename) {
        return true;
    }

    @Override
    public RecordReader<IntWritable, IntPairWritable> getRecordReader(InputSplit split,
            JobConf conf, Reporter reporter) throws IOException {
        return new WalkFileReader(conf, (FileSplit)split);
    }

    public static class WalkFileReader implements RecordReader<IntWritable, IntPairWritable> {
        private Path path;
        private FSDataInputStream in_stream;
        private long start;
        private long end;
        private long pos;

        public WalkFileReader(Configuration conf, FileSplit split) throws IOException {
            path = split.getPath();
            in_stream = FileSystem.get(conf).open(path);
            start = split.getStart();
            end = start + split.getLength();
            long offset = start % 10;
            if (offset != 0) {
                start += 10 - offset;
            }
            in_stream.seek(start);
            pos = start;
        }

        @Override
        public boolean next(IntWritable key, IntPairWritable value) {
            if (in_stream == null) {
                return false;
            }
            if (!readRecord(key, value)) {
                try {
                    in_stream.close();
                } catch (IOException e) { }
                in_stream = null;
                return false;
            }
            return true;
        }

        private boolean readRecord(IntWritable key, IntPairWritable value) {
            if (pos >= end) {
                return false;
            }
            try {
                int walk_id = in_stream.readInt();
                int hop = in_stream.readShort();
                int vertex = in_stream.readInt();
                key.set(walk_id);
                // Hop needs to be first here, because we're going to sort these keys, later, and
                // we want them sorted by hop
                value.set(hop, vertex);
                pos += 10; // 4 bytes for an int, 2 bytes for a short, and 4 bytes for an int
                return true;
            } catch (IOException e) {
                return false;
            }
        }

        @Override
        public void close() {
            if (in_stream == null) {
                return;
            }
            try {
                in_stream.close();

            } catch (IOException e) { }
            in_stream = null;
        }

        @Override
        public IntWritable createKey() {
            return new IntWritable();
        }

        @Override
        public IntPairWritable createValue() {
            return new IntPairWritable();
        }

        @Override
        public long getPos() {
            return pos;
        }

        @Override
        public float getProgress() {
            if (start == end) {
                return 0.0f;
            }
            return Math.min(1.0f, (pos - start) / (float) (end - start));
        }
    }
}
